package org.apache.celestial.service;

import org.apache.celestial.connector.ConnectorConstants;
import org.apache.celestial.core.stream.EnvironmentContext;
import org.apache.celestial.entity.IOTContent;
import org.apache.celestial.entity.IOTException;
import org.apache.celestial.entity.IOTRawData;
import org.apache.celestial.entity.IotLog;
import org.apache.celestial.repo.IotLogRepository;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.stereotype.Component;

import java.util.Properties;
import java.util.stream.Stream;

@Component
public class KafkaService implements InitializingBean {

    @Autowired
    EnvironmentContext<String> environmentContext;

    @Autowired
    IotLogRepository iotLogRepository;

    @Override
    public void afterPropertiesSet() {
        new SimpleAsyncTaskExecutor("kafka-connector-task").execute(() -> {

            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.11.205:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-stream-group-1");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"Kunfei@1405#\";");
            props.put(ConnectorConstants.KAFKA_TOPIC, "cloud_event_iot_raw_data");

            try {
                Stream<String> kafkaConnector = environmentContext.createKafkaConnector(props);
                kafkaConnector.map(content -> {
                    try {
                        return IOTRawData.fromString(content);
                    } catch (IOTException e) {
                        e.printStackTrace();
                    }
                    return null;
                }).filter(iotRawData -> iotRawData != null)
                        .map(iotRawData -> {
                            IotLog iotLog = new IotLog();
                            iotLog.setMessageId(iotRawData.getMessageId());
                            iotLog.setOperationTime(iotRawData.getOperationTime());
                            if (iotRawData.getContent() != null) {
                                iotLog.setProductKey(iotRawData.getContent().getProductKey());
                                iotLog.setDeviceName(iotRawData.getContent().getDeviceName());
                            }
                            return iotLog;
                        }).forEach(iotLog -> iotLogRepository.save(iotLog));
            } catch (Exception e) {
                e.printStackTrace();
            }

        });
    }
}
